作者:布瓜Pourqu2502854853 | 来源:互联网 | 2023-08-31 16:52
篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何扩展HiveMetastoreThriftRPC服务接口相关的知识,希望对你有一定的参考价值。Hive在经历
篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何扩展Hive Metastore Thrift RPC服务接口相关的知识,希望对你有一定的参考价值。
Hive 在经历十几年的发展,已经获得广泛应用,随着版本升级,不同版本之间的协议接口会发生一些变化,尽管HMS在尽量保持向前兼容,但在大版本变更时,仍然不能保证完全兼容,比如HMS2到HMS3,有关索引(Index)相关的接口变成了约束(constraint),比如下图中左边是HMS3接口定义,右边是HMS2接口定义:
这样当使用Hive 2的Client去访问HMS 3 的服务时候,就会报无效方法(Invalid Method)异常,此时可以通过一个Hive Metastore代理服务来解决。
在具体介绍Hive Metastore代理服务前,需要了解下Hive Metastore服务处理流程:
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
TProcessor processor &#61; new ThriftHiveMetastore.Processor<>(handler));
final TThreadPoolServer.Args serverArgs &#61; new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);
TServer server &#61; new TThreadPoolServer(serverArgs);
server.setServerEventHandler(getServerEventHandler());
server.serve();
上述示例代码启动了Hive Metastore的服务&#xff0c;其中两个重要的组件&#xff1a;
TThreadPoolServer功能介绍
TThreadPoolServer主要负责请求的监听和派遣&#xff0c;当接收到新的请求时&#xff0c;在内部将每个client的请求放入Worker线程池&#xff0c;然后继续下一个请求的监听&#xff0c;而工作线程负责一对一处理每个client的请求&#xff0c;直至处理完毕&#xff1a;
#TThreadPoolServer.class
public class TThreadPoolServer extends TServer
//private ExecutorService executorService_;
public void serve()
...
while(!this.stopped_)
TTransport client &#61; this.serverTransport_.accept();
TThreadPoolServer.WorkerProcess wp &#61; new TThreadPoolServer.WorkerProcess(client);
this.executorService_.execute(wp);
...
this.executorService_.shutdown();
...
private class WorkerProcess implements Runnable
//private TTransport client_;
public void run()
do
if (eventHandler !&#61; null)
eventHandler.processContext(connectionContext, inputTransport, outputTransport);
while(!TThreadPoolServer.this.stopped_ && processor.process(inputProtocol, outputProtocol));
if (this.client_.isOpen())
this.client_.close();
TProcessor功能介绍
上面介绍了WorkerProcess的工作功能&#xff0c;TThreadPoolServer将每个客户端连接整体扔给TProcessor&#xff0c;没有介绍在其通过processor.process处理时候&#xff0c;如何定位每个不同的方法&#xff0c;实际上区分不同方法调用的逻辑同时也是TProcessor负责的&#xff0c;以Processor为例&#xff0c;定义了所支持的接口映射&#xff1a;
#ThriftHiveMetastore.class
public static class Processor extends com.facebook.fb303.FacebookService.Processor implements TProcessor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));
//Note:processMap
protected Processor(I iface, Map> processMap)
super(iface, getProcessMap(processMap));
private static Map> getProcessMap(Map> processMap)
processMap.put("create_catalog", new ThriftHiveMetastore.Processor.create_catalog());
processMap.put("alter_catalog", new ThriftHiveMetastore.Processor.alter_catalog());
processMap.put("get_catalog", new ThriftHiveMetastore.Processor.get_catalog());
processMap.put("get_catalogs", new ThriftHiveMetastore.Processor.get_catalogs());
processMap.put("drop_catalog", new ThriftHiveMetastore.Processor.drop_catalog());
processMap.put("create_database", new ThriftHiveMetastore.Processor.create_database());
processMap.put("get_database", new ThriftHiveMetastore.Processor.get_database());
processMap.put("drop_database", new ThriftHiveMetastore.Processor.drop_database());
...
也就是凡是登记在processMap的方法才能被识别&#xff0c;否则都会报异常&#xff0c;这个逻辑可以在Processor的父类TBaseProcessor中得知&#xff1a;
public abstract class TBaseProcessor implements TProcessor
private final I iface;
private final Map> processMap;
protected TBaseProcessor(I iface, Map> processFunctionMap)
this.iface &#61; iface;
this.processMap &#61; processFunctionMap;
public Map> getProcessMapView()
return Collections.unmodifiableMap(this.processMap);
public boolean process(TProtocol in, TProtocol out) throws TException
TMessage msg &#61; in.readMessageBegin();
ProcessFunction fn &#61; (ProcessFunction)this.processMap.get(msg.name);
if (fn &#61;&#61; null)
TProtocolUtil.skip(in, (byte)12);
in.readMessageEnd();
TApplicationException x &#61; new TApplicationException(1, "Invalid method name: &#39;" &#43; msg.name &#43; "&#39;");
out.writeMessageBegin(new TMessage(msg.name, (byte)3, msg.seqid));
x.write(out);
out.writeMessageEnd();
out.getTransport().flush();
return true;
else
fn.process(msg.seqid, in, out, this.iface);
return true;
在process方法中&#xff0c;根据客户端调用传来的方法名称去查找processMap方法映射&#xff0c;如果找不到就报异常。如何添加非注册方法呢&#xff1f;观察Processor的构造方法&#xff1a;
#ThriftHiveMetastore.class
public static class Processor extends com.facebook.fb303.FacebookService.Processor implements TProcessor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(ThriftHiveMetastore.Processor.class.getName());
public Processor(I iface)
super(iface, getProcessMap(new HashMap()));
//Note:processMap
protected Processor(I iface, Map> processMap)
super(iface, getProcessMap(processMap));
...
虽然可以传入processMap&#xff0c;但是该构造方法是protected&#xff0c;不允许直接创建对象 &#xff0c;只能通过继承来扩展&#xff0c;比如这样&#xff1a;
public static class MetastoreProcessor extends ThriftHiveMetastore.Processor
private static final Logger LOGGER &#61; LoggerFactory.getLogger(MetastoreProcess.class.getName());
public MetastoreProcess(I iface)
super(iface, getProcessMap(new HashMap>()));
protected MetastoreProcess(I iface, Map> processMap)
super(iface, getProcessMap(processMap));
private static Map> getProcessMap(Map> processMap)
processMap.put("add_index", new add_index());
processMap.put("alter_index", new alter_index());
processMap.put("drop_index_by_name", new drop_index_by_name());
processMap.put("get_index_by_name", new get_index_by_name());
processMap.put("get_indexes", new get_indexes());
processMap.put("get_index_names", new get_index_names());
return processMap;
然后修改TProcessor实例&#xff0c;重建服务&#xff1a;
HMSHandler baseHandler &#61; new HiveMetaStore.HMSHandler("metaserver", conf,false);
IHMSHandler handler &#61; newRetryingHMSHandler(baseHandler, conf);
TProcessor processor &#61; new MetastoreProcessor<>(handler));
final TThreadPoolServer.Args serverArgs &#61; new TThreadPoolServer
.Args(serverTransport)
.transportFactory(new TTransportFactory())
.protocolFactory(new TBinaryProtocol.Factory())
.processor(processor);